package com.atguigu.demo;

import com.atguigu.utils.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

public class FlinkSQLJoinApp {

    //订单：
    //{"id":"101","user_id":"u_101","status":"1001", "amount":200,"ts":1000}
    //{"id":"102","user_id":"u_102","status":"1001", "amount":400,"ts":11000}
    //{"id":"103","user_id":"u_103","status":"1001", "amount":600,"ts":15000}
    //
    //支付
    //  {"id":"55","order_id":"101","type":"wx"  ,"ts":11000}
    //  {"id":"56","order_id":"102","type":"ali" ,"ts":42000}
    // 0 设定环境
    //1  定义左表  kafka
    //2  定义右表 kafka
    //3  进行join 操作
    //4  定义目标表  kafka
    //5   输出到目标表
    public static void main(String[] args) {
        // 0 设定环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //设定状态失效时间
        tableEnv.getConfig().setIdleStateRetention(Duration.ofMillis(20000));
        //1  定义左表  kafka
        String createOrderInfoSQL="   CREATE TABLE order_info (\n" +
                "         `id`   STRING,\n" +
                "           `user_id` STRING,\n" +
                "          `status` STRING,\n" +
                "          `amount` STRING,\n" +
                "          `ts` STRING\n" +
                "        )  " +
                KafkaUtil.getKafkaDDL("demo_left_topic","flink_sql_join");
       tableEnv.executeSql(createOrderInfoSQL);

        //2  定义右表 kafka
        String createPaymentInfoSQL="   CREATE TABLE payment_info (\n" +
                "         `id`   STRING,\n" +
                "           `order_id` STRING,\n" +
                "          `type` STRING,\n" +
                "          `ts` STRING\n" +
                "        )  " +
                KafkaUtil.getKafkaDDL("demo_right_topic","flink_sql_join");
        tableEnv.executeSql(createPaymentInfoSQL);
        //3  进行join 操作
        // select o.id,o.user_id,o.status,o.amount,o.ts,
        // p.id as pay_id,p.type as pay_type,p.ts as pay_ts
        // from order_info o join  payment_info p on o.id=p.order_id

        String joinSQL=" select o.id,o.user_id,o.status,o.amount,o.ts,\n" +
                "         p.id as pay_id,p.type as pay_type,p.ts as pay_ts\n" +
                "         from order_info o left join  payment_info p on o.id=p.order_id";




        //4  定义目标表  kafka

        String createOrderPayWideSQL="   CREATE TABLE order_pay_wide (\n" +
                "         `id`   STRING,\n" +
                "           `user_id` STRING,\n" +
                "          `status` STRING,\n" +
                "          `amount` STRING,\n" +
                "          `ts` STRING,\n" +
                "          `pay_id` STRING,\n" +
                "          `pay_type` STRING,\n" +
                "          `pay_ts` STRING ,\n" +
                "           PRIMARY KEY (id) NOT ENFORCED\n" +
                "        )  " +
                KafkaUtil.getUpsertKafkaDDL("demo_order_pay_wide" );

        tableEnv.executeSql(createOrderPayWideSQL);


        //5   输出到目标表

        String insertSQL="insert into  order_pay_wide "+joinSQL;
        tableEnv.executeSql(insertSQL) ;
    }
}
